www.gusucode.com > VC++ ICPQ聊天室源程序-源码程序 > VC++ ICPQ聊天室源程序-源码程序/code/ChatRoomSever/ChatServer.cpp
#include "stdAfx.h" #include "ChatServer.h" // Download by http://www.NewXing.com // Global varible declare ChatServer* g_pchatserver = NULL; ULNode* g_puserlist = NULL; HANDLE g_hClientInfoMutex; CString g_smsg; DWORD WINAPI ChatServer::ServerWorkerThread(LPVOID CompletionPortID) { HANDLE CompletionPort = (HANDLE) CompletionPortID; DWORD BytesTransferred; CLIENT_OBJ* clientobj; DWORD SendBytes, RecvBytes; DWORD Flags; BOOL hasnamed = FALSE; int rc; char errormsg[30]; while(TRUE) { if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (LPDWORD)&clientobj,(LPOVERLAPPED*)&clientobj, INFINITE) == 0) { rc = GetLastError(); if(rc != ERROR_NETNAME_DELETED) { wsprintf(errormsg,"GetQueuedCompletionStatus failed with error %d", rc); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } } EnterCriticalSection(&clientobj->SockCritSec); switch(clientobj->optype) { case OP_IORead: if (BytesTransferred == 0) //client quit from chat room { char name[MAX_NAME_SIZE],leavemsg[50]; BOOL renamed = FALSE; strcpy(name,clientobj->clientname); if (closesocket(clientobj->sclient) == SOCKET_ERROR) { wsprintf(errormsg,"closesocket() failed with error %d", WSAGetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } clientobj->sclient = INVALID_SOCKET; renamed = clientobj->firstrecv; g_pchatserver->FreeClientObj(clientobj); if(!renamed) //if there is a repetitive name error , system will not broadcast exit message { BUFFER_OBJ *newobj = (BUFFER_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(BUFFER_OBJ)); if (newobj == NULL) { wsprintf(errormsg, "PublicSendBufffer: HeapAlloc failed: %d", GetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } wsprintf(newobj->DataBuffer,"[系统消息]%s离开聊天室。\r\n",name); g_pchatserver->SendPublicMessage(newobj); wsprintf(leavemsg,"*********[客户%s退出聊天室]*********\r\n",name); g_smsg = leavemsg; ::PostMessage(g_pchatserver->m_showmsgdlg,WM_UPDATEMSG,NULL,NULL); continue; } } else //broadcast chat message or client login message { clientobj->broadcast = TRUE; if(clientobj->firstrecv) { strncpy(clientobj->clientname,clientobj->RecvDataBuf.buf,BytesTransferred); WaitForSingleObject(g_hClientInfoMutex,INFINITE); g_pchatserver->m_clientcount++; if(g_puserlist) { ULNode* ulptr = g_puserlist; while(ulptr) { if(!strcmp(ulptr->szUsername,clientobj->clientname)) {//if there is a repetitive name, close socket and wait client quit hasnamed = TRUE; clientobj->optype = OP_IORead; ZeroMemory(&(clientobj->ol), sizeof(OVERLAPPED)); clientobj->RecvDataBuf.len = DEFAULT_BUFFER_SIZE; clientobj->RecvDataBuf.buf = clientobj->recvbuf->DataBuffer; Flags = 0; if (WSARecv(clientobj->sclient, &(clientobj->RecvDataBuf), 1, &RecvBytes, &Flags, &(clientobj->ol), NULL) == SOCKET_ERROR) { if (WSAGetLastError() != ERROR_IO_PENDING) { wsprintf(errormsg,"WSARecv() failed with error %d", WSAGetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } } shutdown(clientobj->sclient,SD_SEND); break; } ulptr = ulptr->next; } } ReleaseMutex(g_hClientInfoMutex); if(hasnamed) { hasnamed = FALSE; continue; } else { SYSTEMTIME time; char welmsg[50]; GetSystemTime(&time); wsprintf(clientobj->time,"%d:%d:%d",time.wHour+8,time.wMinute,time.wSecond); wsprintf(welmsg,"[系统消息]欢迎%s进入...\r\n",clientobj->clientname); strcpy(clientobj->recvbuf->DataBuffer,welmsg); g_pchatserver->UpdateUserList(clientobj); wsprintf(welmsg,"*********[客户%s进入聊天室]*********\r\n",clientobj->clientname); g_smsg = welmsg; ::SendMessage(g_pchatserver->m_showmsgdlg,WM_UPDATEMSG,NULL,NULL); clientobj->firstrecv = FALSE; } } else { g_smsg += '['; g_smsg += clientobj->clientname; g_smsg += "]:"; g_smsg += clientobj->recvbuf->DataBuffer; g_smsg += "\r\n"; ::SendMessage(g_pchatserver->m_showmsgdlg,WM_UPDATEMSG,NULL,NULL); char tmp[DEFAULT_BUFFER_SIZE]; wsprintf(tmp,"[%s]:%s",clientobj->clientname,clientobj->recvbuf->DataBuffer); strcpy(clientobj->recvbuf->DataBuffer,tmp); } clientobj->sendbuf = clientobj->recvbuf; //fecth receive buffer as send buffer clientobj->recvbuf = g_pchatserver->GetBufferObj(); //get new receive buffer WaitForSingleObject(g_pchatserver->m_hlinkmutex,INFINITE); //wait for finishing CLIENT_OBJ *ptr = g_pchatserver->m_Client; clientobj->sendbuf->sendcount = g_pchatserver->m_clientcount; while(ptr) //go through all client and send broadcast { ptr->optype = OP_IOWrite; ZeroMemory(&(ptr->ol), sizeof(OVERLAPPED)); ptr->SendDataBuf.buf = clientobj->sendbuf->DataBuffer; ptr->SendDataBuf.len = DEFAULT_BUFFER_SIZE; ptr->sendbuf = clientobj->sendbuf; if (WSASend(ptr->sclient, &(ptr->SendDataBuf), 1, &SendBytes, 0, &(ptr->ol), NULL) == SOCKET_ERROR) { if (WSAGetLastError() != ERROR_IO_PENDING) { wsprintf(errormsg,"WSASend() failed with error %d", WSAGetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } } ptr = ptr->next; } ReleaseMutex(g_pchatserver->m_hlinkmutex); } break; case OP_IOWrite: clientobj->optype = OP_IORead; if(clientobj->sendbuf->sendcount != 0) clientobj->sendbuf->sendcount--; if(clientobj->sendbuf->sendcount ==0) {//Send Message Complete Successfully // ::MessageBox(NULL,"Test","Debug",MB_OK); g_pchatserver->FreeBufferObj(clientobj->sendbuf); clientobj->sendbuf = NULL; } if(clientobj->broadcast) //if it's the message sender, then start receive new message { ZeroMemory(&(clientobj->ol), sizeof(OVERLAPPED)); clientobj->RecvDataBuf.len = DEFAULT_BUFFER_SIZE; clientobj->RecvDataBuf.buf = clientobj->recvbuf->DataBuffer; Flags = 0; if (WSARecv(clientobj->sclient, &(clientobj->RecvDataBuf), 1, &RecvBytes, &Flags, &(clientobj->ol), NULL) == SOCKET_ERROR) { if (WSAGetLastError() != ERROR_IO_PENDING) { wsprintf(errormsg,"WSARecv() failed with error %d", WSAGetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } } clientobj->broadcast = FALSE; } break; } LeaveCriticalSection(&clientobj->SockCritSec); } } ChatServer::ChatServer() {//initialize m_InternetAddr.sin_family = AF_INET; m_InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY); m_InternetAddr.sin_port = htons(PORT); m_hKillEvent = CreateEvent(NULL, TRUE, FALSE, NULL); m_Client = NULL; m_userlistdlg = NULL; m_clientcount = 0; g_hClientInfoMutex = CreateMutex(NULL,FALSE,"UserListMutext"); m_hlinkmutex = CreateMutex(NULL,FALSE,"LinkTableMutex"); } ChatServer::~ChatServer() { CloseHandle(g_hClientInfoMutex); CloseHandle(m_hlinkmutex); } int ChatServer::Run() { DWORD ThreadID; WSADATA wsaData; DWORD Ret; DWORD RecvBytes; DWORD Flags; int addrlen; //client ip address length if ((Ret = WSAStartup(0x0202, &wsaData)) != 0) { wsprintf(errormsg,"WSAStartup failed with error %d", Ret); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } // Setup an I/O completion port. if ((m_CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)) == NULL) { sprintf(errormsg,"CreateIoCompletionPort failed with error: %d", GetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } // Determine how many processors are on the system. GetSystemInfo(&m_SystemInfo); // Create worker threads based on the number of processors available on the // system. Create two worker threads for each processor. for(unsigned int i = 0; i < m_SystemInfo.dwNumberOfProcessors * 2; i++) { HANDLE ThreadHandle; // Create a server worker thread and pass the completion port to the thread. if ((ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, m_CompletionPort, 0, &ThreadID)) == NULL) { wsprintf(errormsg,"CreateThread() failed with error %d", GetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } // Close the thread handle CloseHandle(ThreadHandle); } // Create a listening socket if ((m_Listen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET) { wsprintf(errormsg,"WSASocket() failed with error %d", WSAGetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } if (bind(m_Listen, (PSOCKADDR) &m_InternetAddr, sizeof(m_InternetAddr)) == SOCKET_ERROR) { wsprintf(errormsg,"bind() failed with error %d", WSAGetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } // Prepare socket for listening if (listen(m_Listen, 5) == SOCKET_ERROR) { wsprintf(errormsg,"listen() failed with error %d", WSAGetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } // Accept connections and assign to the completion port. while(WAIT_TIMEOUT == WaitForSingleObject(m_hKillEvent, 0)) //when m_killevent is set , server shutdown { CLIENT_OBJ *newclientobj = GetClientObj(); addrlen = sizeof(newclientobj->addressinfo); if ((newclientobj->sclient = WSAAccept(m_Listen,(SOCKADDR *)&(newclientobj->addressinfo), &addrlen, NULL, 0)) == SOCKET_ERROR) { wsprintf(errormsg,"WSAAccept() failed with error %d", WSAGetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } newclientobj->firstrecv = TRUE; //user login in WaitForSingleObject(m_hlinkmutex,INFINITE); if(m_Client == NULL) m_Client = newclientobj;//there is no user else { newclientobj->next = m_Client; m_Client = newclientobj; } ReleaseMutex(m_hlinkmutex); newclientobj->optype = OP_IORead; newclientobj->recvbuf = GetBufferObj(); //associate the client socket with completion-port if (CreateIoCompletionPort((HANDLE)newclientobj->sclient,m_CompletionPort, (DWORD)newclientobj, 0) == NULL) { wsprintf(errormsg,"CreateIoCompletionPort failed with error %d", GetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } ZeroMemory(&(newclientobj->ol), sizeof(OVERLAPPED)); newclientobj->RecvDataBuf.len = DEFAULT_BUFFER_SIZE; newclientobj->RecvDataBuf.buf = newclientobj->recvbuf->DataBuffer; Flags = 0; //start receive ... if (WSARecv(newclientobj->sclient,&(newclientobj->RecvDataBuf),1, &RecvBytes, &Flags, &(newclientobj->ol), NULL) == SOCKET_ERROR) { if (WSAGetLastError() != ERROR_IO_PENDING) { wsprintf(errormsg,"WSARecv() failed with error %d", WSAGetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return -1; } } } CloseHandle(m_hKillEvent); return 0; } void ChatServer::ShutDown() { SetEvent(m_hKillEvent); } BUFFER_OBJ* ChatServer::GetBufferObj() { BUFFER_OBJ *newobj=NULL; // Allocate the object newobj = (BUFFER_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(BUFFER_OBJ)); if (newobj == NULL) { wsprintf(errormsg, "GetBufferObj: HeapAlloc failed: %d", GetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); ExitProcess(-1); } newobj->sendcount = 0; return newobj; } void ChatServer::FreeBufferObj(BUFFER_OBJ *obj) { HeapFree(GetProcessHeap(), 0, obj); } CLIENT_OBJ* ChatServer::GetClientObj() { CLIENT_OBJ *newobj = NULL; newobj = (CLIENT_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(CLIENT_OBJ)); if (newobj == NULL) { wsprintf(errormsg, "GetSocketObj: HeapAlloc failed: %d", GetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); ExitProcess(-1); } InitializeCriticalSection(&newobj->SockCritSec); newobj->firstrecv = FALSE; newobj->sendbuf = NULL; newobj->broadcast = FALSE; newobj->next = NULL; return newobj; } void ChatServer::FreeClientObj(CLIENT_OBJ *obj) { BOOL renamedeal =FALSE; WaitForSingleObject(m_hlinkmutex,INFINITE); char clientname[MAX_NAME_SIZE]; strcpy(clientname,obj->clientname); if(obj->firstrecv) renamedeal = TRUE; //handle the same login name // ::MessageBox(NULL,clientname,NULL,MB_OK); if(obj == m_Client) { m_Client = m_Client->next; } else { CLIENT_OBJ *cliptr = m_Client; while(cliptr) { if(obj == cliptr->next) break; cliptr = cliptr->next; } cliptr->next = obj->next; } m_clientcount--; ReleaseMutex(m_hlinkmutex); DeleteCriticalSection(&(obj->SockCritSec)); HeapFree(GetProcessHeap(), 0, obj->RecvDataBuf.buf); HeapFree(GetProcessHeap(), 0, obj); if(!renamedeal) { WaitForSingleObject(g_hClientInfoMutex,INFINITE); ULNode* tmp = NULL; if(!strcmp(g_puserlist->szUsername,clientname)) { tmp = g_puserlist; g_puserlist = g_puserlist->next; } else { ULNode* ulptr = g_puserlist; while(ulptr) { if(!strcmp(ulptr->next->szUsername,clientname)) break; ulptr = ulptr->next ; } tmp = ulptr->next ; ulptr->next = tmp->next; } ReleaseMutex(g_hClientInfoMutex); HeapFree(GetProcessHeap(), 0,tmp); if(m_userlistdlg) ::PostMessage(m_userlistdlg,WM_USERQUIT,NULL,NULL); //Send user quit message to UserList dialog } } void ChatServer::UpdateUserList(CLIENT_OBJ *obj) { ULNode *newobj = NULL; newobj = (ULNode *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(ULNode)); if (newobj == NULL) { wsprintf(errormsg, "GetUserListNodeObj: HeapAlloc failed: %d", GetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); ExitProcess(-1); } newobj->next = NULL; strcpy(newobj->szUsername,obj->clientname); strcpy(newobj->sztime,obj->time); wsprintf(newobj->szIP,"%s", inet_ntoa(obj->addressinfo.sin_addr)); WaitForSingleObject(g_hClientInfoMutex,INFINITE); if(g_puserlist == NULL) g_puserlist = newobj; else { newobj->next = g_puserlist; g_puserlist = newobj; } ReleaseMutex(g_hClientInfoMutex); if(m_userlistdlg) ::PostMessage(m_userlistdlg,WM_USERIN,NULL,NULL); //Send user login message to UserList dialog } void ChatServer::SetUserListDlgHandle(HWND dlg) { m_userlistdlg = dlg; } void ChatServer::SetShowMsgDlgHandle(HWND dlg) { m_showmsgdlg = dlg; } void ChatServer::LetUserOut(char* name) { WaitForSingleObject(m_hlinkmutex,INFINITE); CLIENT_OBJ *ptr = m_Client; while(ptr) { if(!strcmp(ptr->clientname,name)) { shutdown(ptr->sclient,SD_SEND); break; } ptr = ptr->next; } ReleaseMutex(m_hlinkmutex); } //send system message void ChatServer::SendPublicMessage(BUFFER_OBJ *msg) { DWORD SendBytes; WaitForSingleObject(g_pchatserver->m_hlinkmutex,INFINITE); CLIENT_OBJ *ptr = g_pchatserver->m_Client; msg->sendcount = m_clientcount; while(ptr) { ptr->optype = OP_IOWrite; ZeroMemory(&(ptr->ol), sizeof(OVERLAPPED)); ptr->SendDataBuf.buf = msg->DataBuffer; ptr->SendDataBuf.len = DEFAULT_BUFFER_SIZE; ptr->sendbuf = msg; if (WSASend(ptr->sclient, &(ptr->SendDataBuf), 1, &SendBytes, 0, &(ptr->ol), NULL) == SOCKET_ERROR) { if (WSAGetLastError() != ERROR_IO_PENDING) { wsprintf(errormsg,"WSASend() failed with error %d", WSAGetLastError()); ::MessageBox(NULL,errormsg,"Error",MB_OK); return; } } ptr = ptr->next; } ReleaseMutex(g_pchatserver->m_hlinkmutex); }